10時間くらいでYouTubeチャンネルの再生回数を取得するサーバレスな構築を作ってみた
データアナリティクス事業本部@札幌の佐藤です。
突然ですが皆さんはもう『アイカツオンパレード!ドリームストーリー』はご覧になられたでしょうか。
スターライト学園、ドリームアカデミー、四ツ星学園、ネオヴィーナスアーク、スターハーモニー学園の各代表者で競うドリームスクールグランプリ展開が楽しみですね。
YouTubeバンダイチャンネルで配信中ですので是非ご覧ください。
ということで、今回はYouTubeチャンネルの再生回数を毎日Amazon S3に格納するデータレイクっぽいものをサーバレスで構築した内容となります。
今回の構成
今回はこのような構成としました。
毎日0時にAmazon CloudWatchEventsからトリガーされAWS Lambdaを実行します。
AWS LambdaのコードはPythonで実装しています。Python用Google APIクライアントライブラリを使用してYouTubeにアクセス、チャンネルの新着上位50件の動画IDとタイトル、再生回数、アップロード日をtsv化してS3に格納しています。
通知は、LINE Notifyでシンプルに成功したかどうかを通知しています。
Amazon Simple Notification Service(SNS)にしなかった理由は、少し運用してみてLINEのほうが見るなと思ったからです。
Amazon DynamoDBは複数のYouTubeチャンネルに対応したかったという点、Amazon DynamoDBの練習のために使用しています。 Amazon DynamoDBにはYouTubeチャンネルIDやアウトプットファイル名を格納しています。
構成図上には記載していませんが、AWS Lambdaで使用するgoogle Cloud Platformで発行したAPIキーやLine NotifyのAPIキーなどは、すべてAWS Systems Manager パラメータストアに設定しています。
また、IAMについては、今回は各機能に対してフルアクセス権限を持つロールを使用しています。
再生回数を取得するYouTubeチャンネルは、バンダイチャンネルとアイチューブ(アイカツ!のYouTubeチャンネル)です。
そもそもなぜこのような構築を作ろうと思ったのか
愛です!
前々からアイチューブにアップロードされている動画の再生回数を自動で取得したいという強い思いがあり、ちょうど新シリーズがYouTube配信されるとのことで、やるなら今だなという判断に至ったためです。
(以前、『アイカツオンパレード!』開始前にアイドル27人の告知動画の再生回数取得を手動運用していたのですが、なかなか面倒くさかった)
また、コロナウィルスで在宅勤務メインとなり遠距離通勤者の私が自習できる時間が増えたという点や、1からサーバーレス環境を構築したことがなかったので勉強のためというきっかけがあったというのもひとつの要素です。
開発モチベーション
作業時間は1日2時間、夜寝る前に実施していました。
構築するにあたり知らない機能も多く概要調査から入ったり、Google Cloud PlatformのPython用Google APIクライアントライブラリも使用経験はなかったため詰まることも多かったです。
PoCしながら少しずつ大きくしたり変更したり「できてる感」を意識してモチベーションを落とさないようにしていました。
第1夜:Google APIクライアントライブラリを使ってYoutubeにアクセスして再生回数を取得する
第2夜:よくある一定時刻でトリガーしてAWS Lambdaで処理してメール送信するまでの仕組みを作ろう
第3夜:AWS LambdaのLayer機能を使って第1夜のライブラリを入れて実装しよう
第4夜:実装リソースのリファクタリングと、使い勝手的にメールではなくLINEで通知に変更しよう
本番:Amazon DynamoDBを使用して複数のYouTubeチャンネルを処理できるように変更しよう
結構ゆとりをもって終えられたのですが、いざ放送日に『アイカツオンパレード!ドリームストーリー』の配信先がアイチューブではなく、バンダイチャンネルということが発覚し、放送後「トワイライトエトランゼ」を聞きながらAmazon DynamoDBを利用するように一気に修正していきました。
「トワイライトエトランゼ」本当名曲。
LINEでの通知と格納ファイル
こんな感じでスタンプと一緒に結果が通知されます。
Amazon S3に格納されるファイルは以下のような感じです。
運用してみて
運用して2週間くらいですが、4月1日に上手く取得できず、手動リトライした以外は毎日問題なく動いています。
飲み会の最中に取得してたり、手動で再生回数を集めていたのが馬鹿らしかったなというのが素直な感想です。
1日1回しか起動しないためコスト面も全くかかっておらずやってよかったです。
細かい実装については以下に記載します。
実装
Lambdaの実装は以下になります。
import datetime import time import requests import boto3 from apiclient.discovery import build def lambda_handler(event, context): output_datetime = datetime.datetime.now().strftime('%Y%m%d%H%M%S') YOUTUBE_API_KEY = get_ssm_parameter_store('GOOGLE_YOUTUBE_API_KEY') AWS_S3_BUCKET_NAME = get_ssm_parameter_store('AIKATSU_YOUTUBE_OUTPUT_BUCKET') base_url = 'https://www.googleapis.com/youtube/v3' url = base_url + '/search?key=%s&channelId=%s&part=snippet,id&order=date&maxResults=50' # DynamoDBのデータの取得 table = get_resource('dynamodb').Table('aikatsu_crawler_channnel') tables = table.scan() for item in tables['Items']: videos = [] err_msg = '' output ='{}\t{}\t{}\t{}\n'.format('video_id', 'views', 'title', 'upload_timestamp') file_key = '{0}/{0}_{1}.tsv'.format(output_datetime, item['output_file_name']) youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY, cache_discovery=False) while True: res = requests.get(url % (YOUTUBE_API_KEY, get_ssm_parameter_store(item['channnel_id']))) if res.status_code != 200: err_msg +='{}\n'.format('リクエストコードNG') break result = res.json() videos=[ [item['id']['videoId'], item['snippet']['title'], item['snippet']['publishedAt']] for item in result['items'] if item['id']['kind'] == 'youtube#video' ] if len(videos) == 0: err_msg +='{}\n'.format('取得失敗') break for i in videos: viewCount = youtube.videos().list(part='statistics', id=i[0]).execute()['items'][0]['statistics']["viewCount"] output +='{}\t{}\t{}\t{}\n'.format(str(i[0]), str(viewCount), str(i[1]), str(i[2])) time.sleep(5) break s3 = get_resource('s3') r = s3.Object(AWS_S3_BUCKET_NAME, file_key).put(Body = output) post_line_messages(err_msg) # Lineへの通知 def post_line_messages(err_msg): url = 'https://notify-api.line.me/api/notify' LINE_NOTIFY_TOKEN = get_ssm_parameter_store('LINE_NOTIFY_API_KEY') headers = {'Authorization' : 'Bearer '+ LINE_NOTIFY_TOKEN} if err_msg == '': payload = {'message': '無事成功しました!', 'stickerPackageId': 2, 'stickerId': 144} else: payload = {'message': 'なんかエラーになってるよ…\r{}'.format(err_msg), 'stickerPackageId': 2, 'stickerId': 154} r = requests.post(url ,headers = headers ,params=payload) # SSMのパラメータストアの取得 def get_ssm_parameter_store(param, WithDecryption=True): ssm_client = boto3.client('ssm') res = ssm_client.get_parameters(Names=[param], WithDecryption=WithDecryption) return res['Parameters'][0]['Value'] # boto3のresourceの取得 def get_resource(type): return boto3.resource(type)
実装内容は各YouTubeチャンネル別ので新着上位50件の動画を取得し、そのひとつずつ動画の再生回数を取得、動画IDとタイトル、再生回数、アップロード日をtsv化してS3に格納しています。
実装にあたっては、Youtube Data API(v3)のリファレンスや、Qiita記事のYouTube Data api v3をPythonから使って特定のチャンネルの動画を取得するなどを参考にしました。
Python用Google APIクライアントライブラリやRequestsライブラリは、AWS Lambdaレイヤーを使用しています。 実装中一番手間取ったのもAWS Lambdaレイヤーに関する箇所のImportエラーでした。
詳細は該当の記事をご参照ください。
Python用Google APIクライアントライブラリのImportエラーが発生してしまう問題をなんとかする
また、各APIキーをそのままソースコードにさらしてしまうのはあまりよろしくない(私しか使いませんが)ので、パラメータは全てAWS Systems Manager パラメータストア上に設定しています。
また、アイチューブとバンダイチャンネル両方の情報を取得したかったので、各Amazon DynamoDB上に各パラメータストアのIDをセットしています。 (複合キーとして使用しています)
LINE NotifyはLINE Notify API Documentを参考にしつつ実装しました。 かなり簡単に実装できたので、メールならSNS、LINEであればLINE Notifyを使用するのが良いのかもと思いました。
最後に
想像されている以上にサーバレスな構築が簡単にできるので、手動運用していて面倒くさいことは置き換えてみるのはいかがでしょうか。
第2話以降、集計している再生回数の時系列データを比較したり、いじれるのが楽しみです。
『アイカツオンパレード!ドリームストーリー』の第2話、ノエルドリーム後編は4月11日(土)夜7時から配信予定です。 よろしくお願いします。